1. Overview
之前提到过, ThreadPoolExecutor的内部类Worker维护了一个非常基本的锁结构. 方法不多, 可以由名知义:
- lock
- tryLock
- unlock
- isLocked
Worker本身继承了AbstractQueuedSynchronizer(AQS)来实现上述功能的. 事实上, AQS是Java中数个并发工具的基础, 功能也比较复杂. 我们先从这些并发工具中最简单的Worker开始, 之后逐步递进:
- ThreadPoolExecutor#Worker
- CountDownLatch
- Semaphore
- FutureTask (JDK 1.6)
- ReentrantLock
- ConditionObject
- ReadWriteReentrantLock
- AQS总结
2. 一个最简单的独占锁
我们可以基于CAS实现一种锁. 代码如下:
1 | import java.util.concurrent.atomic.AtomicBoolean; |
- 类比我们的布尔变量
lockState, AQS中使用的是一个整形字段 -state. 在互斥(即同时只能由一个线程持有锁)的场景下,state可能的取值只有0 / 1两种, 分别对应false,true. tryLock只是简单的尝试获取锁, 而在实际的lock方法中, 等待获取锁的线程都是被阻塞住的. 而这些线程的引用, 被维护于一个队列结构中.
3. A Thread-Safe Queue
AQS中另一个主体是一个基于双向链表的队列结构. 不考虑AQS逻辑, 实现如下:
1 | /** |
初始状态下, head, tail指向同一个节点. 如果没有出队列操作, 只是修改tail引用的节点. 观察enqueue方法, 思路和Atomic工具中的getAndSet思路是一致的. 示例中的类和变量在AQS中都可以找到对应.
4. Worker
AQS是一个抽象类. 其他工具都是以继承的方式覆盖AQS的几个钩子方法.
4.1 tryLock
Worker#tryLock和我们的CASLock的基本思想是一致的, 即通过一个原子变量来表示当前锁的状态, 只不过CASLock中使用的是true/false, 而Worker中使用的是1/0, 维持这个状态的是继承于AQS的state字段.
从下面的流程图可以看出来整体逻辑非常简单. 从这里开始, 稍微值得关注是一个问题是哪些方法是Worker重载的, 而哪些是继承的.

当成功拿到锁之后, 会将exclusiveOwnerThread设为当前线程, 根据名称可以看出来, 仅当独占时才会用到此变量.
4.2 isLocked
同样是基于state的简单逻辑: 即AQS#getState() == 0.
4.3 lock
lock的方法比较复杂. 如图:
- 进入方法时, 首先进行了一次
tryAcquire, 如果成功那真是最好的结局. 否则开始”等待”. - 等待指的就是当前线程被抽象为一个节点, 进入了队列尾端(See
addWaiter) . 就开始”等待”, 随即进入AQS#acquireQueued逻辑. acquireQueued内, 线程进入了一个循环体系中.- 仅当满足 当前节点前面节点为头结点 并且 tryAcquire成功 时才能跳出循环. 此时为成功获取锁.
- 不满足上述条件线程, 会被挂起(park). 如果一个线程此时无法获取锁, 注意它的前置节点
waitStatus初始为0,shouldParkAfterFailedAcquire方法中会将其置为Node.SIGNAL. 而在下次循环中, 此线程才被真正挂起.
看到这里, 如果对于下面几个问题有疑问, 先保留.
waitStatus是干什么的?- 线程在
parkAndCheckInterrupt中, 调用的是Thread.interrupted(). 如我们所知, 这个方法会清楚并返回线程的中断状态. 而acquireQueued中如果曾经被中断过, 就会把中断状态返回给acquire,acquire中调用selfInterrupt, 重新将当前线程中断状态设置为`true - 何时执行到
acquireQueued内部final块调用的cancelAcquire?. 在acquireQueue中的循环, 只有一个出口即满足p == head && tryAcquire(arg). 而此时fail将一定为false, 所以看似一定不会cancelAcquire
Worker看似是一个很典型的锁. 但是注意的是它的构造方法有这么一句1
setState(-1); // inhibit interrupts until runWorker
初始情况state == -1, 所以compareAndSet(0, 1)是永远无法成功的, 后面对此进行说明. 这里先假设, 初始状态state == 0. 基于一个简单的示例说明数据变化
1 | Worker worker = new Worker(); |
这个过程的数据变化如下图(Step 3可以留到下节后再回来看):

4.4 unlock
假如线程A获取了锁, 线程B在等待. 当前专题下
Worker的tryRelease方法是一定会成功的.
// TODO 流程图 + 实例
回顾
虽然前文曾经表示可以暂时将Worker看作一个ReentrantLock, 然后实际上两者行为还是有一些不一致的. 先来看下, ThreadPoolExecutor都有哪里调用了Worker的锁行为.
- 构造方法设置
state为-1. runWorker时先unlock- 在循环中, 每个task执行前后
lock,unlock interruptIdleWorkers前后tryLock,unlock
第1, 2点注释也有说明, 1是为了禁止被interrupt, 2是允许interrupt. 而且由于第一点的存在, 如果当做普通锁, 上来就lock()的话, 是会一直阻塞的, 所以前面把初始状态设置为0.
在大部分情况下, Woker这个同步工具并没有涉及到多线程间通信, 它只被线程池中的线程持有. 而当主线程试图interrupt线程池中线程时, 才有多个线程对锁的争用. ThreadPoolExecutor中
shutdown中会调用interruptIdleWorkers(tryLock成功再interrupt)shutdownNow调用interruptWorkers(对所有线程都interrupt).
这两个方法中一个会判断getState() >= 0, 一个会tryLock(). 所以当getState() == -1时两个操作都无效.
另外, Worker的tryRelease方法很粗暴. 如果有ReentrantLock使用经验会发现, 如果某线程不是锁的持有者但又试图unlock, 会抛出IllegalMonitorStateException. 而Worker则不然, A线程lock后B线程也可以unlock.
单独提取Worker中部分同步相关代码, 可以注意下那些方法是Override的, 以及暴露的方法和AQS的对应关系:
1 | @SuppressWarnings("serial") |